package com.example.flink_demo.job;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.types.Row;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.Timestamp;

/**
 * @author benjamin_5
 * @Description 从数据库读取流任务： 统计各病症人数
 * @date 2024/9/25
 */
public class ReadFromDbJob {
    private static final Log logger = LogFactory.getLog(ReadFromDbJob.class);

    public static void main(String[] args) {
        try {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 指定并行度,默认电脑线程数
            env.setParallelism(3);

            String jdbcUrl = "jdbc:mysql://localhost:3306/test_db";
            String user = "root";
            String password = "123456";
            String driverName = "com.mysql.cj.jdbc.Driver";
            String query = "SELECT name, diagnose_name FROM underwriting_data where diagnose_name is not null";

            JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                    .setDrivername(driverName)
                    .setDBUrl(jdbcUrl)
                    .setUsername(user)
                    .setPassword(password)
                    .setQuery(query)
                    .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
                    .finish();

            DataStreamSource<Row> stream = env.createInput(jdbcInputFormat);

            SingleOutputStreamOperator<Tuple2<String, Integer>> result = stream.map(new MapFunction<Row, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(Row value) throws Exception {
                            // 姓名
                            String name = (String) value.getField(0);
                            // 诊断结果
                            String diagnoseName = (String) value.getField(1);
                            return Tuple2.of(diagnoseName, 1);
                        }
                    }).returns(new TypeHint<Tuple2<String, Integer>>() {
                    })
                    .returns(Types.TUPLE(Types.STRING, Types.INT))
                    .keyBy(value -> value.f0)
                    .sum(1);

            result.addSink(JdbcSink.sink(
                    "insert into world_count (name,number,create_time) values (?,?,?)",
                    (ps, t) -> {
                        ps.setString(1, t.getField(0));
                        ps.setInt(2, t.getField(1));
                        java.util.Date utilDate = new java.util.Date();
                        ps.setTimestamp(3, new Timestamp(utilDate.getTime()));
                    },
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl(jdbcUrl)
                            .withUsername(user)
                            .withPassword(password)
                            .withDriverName(driverName)
                            .build()
            ));

            System.out.println("执行完成");
            // 执行
            env.execute("readFromDb stream job");

        } catch (Exception e) {
            e.printStackTrace();
            logger.error("流任务执行失败：", e);
        }
    }
}
